File: //opt/alt/python37/lib/python3.7/site-packages/clselect/cluseroptselect.py
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import os
import base64
import re
from builtins import map
from future.utils import iteritems
from .cluserextselect import ClUserExtSelect
from .clselectexcept import ClSelectExcept
from clcommon import clcaptain
from . import utils
from xml.sax.saxutils import unescape
from clcommon.utils import ExternalProgramFailed
from clcommon.php_conf_reader import PhpConfReader, PhpConfBaseException,\
PhpConfReadError, PhpConfLoadException, PhpConfNoSuchAlternativeException
class ClUserOptSelect(ClUserExtSelect):
"""
Class for processing user options
"""
OPTIONS_PATH = '/etc/cl.selector/php.conf'
def __init__(self, item='php', exclude_pid_list=None):
ClUserExtSelect.__init__(self, item, exclude_pid_list)
self._whitelist = {}
self._user_excludes = set()
self._html_escape_table = {" ": " ", '"': """, "'": "'",
">": ">", "<": "<", "&": "&"}
self._html_unescape_table = {v: k for k, v in iteritems(self._html_escape_table)}
def insert_options(self, user, version,
optset, decoder, append=False, quiet=True, create=True):
"""
Inserts supplied options into current ones
@param optset: string
@param decoder: string
@param
"""
options = {}
if optset != '':
options = self._process_option_string(
optset=optset, decoder=decoder, expect_separator=True)
options = self._remove_forbidden_options(options, version, quiet)
self.insert_json_options(user, version, options, append, create)
def insert_json_options(self, user, version,
options, append=False, create=True):
"""
Inserts supplied options into current ones
@param user: string
@param version: string
@param options: object
"""
self._check_user_in_cagefs(user)
user_ini_path = self._compose_user_ini_path(user, version)
(contents, extensions,
extensions_data) = self._load_ini_contents(user_ini_path)
contents = self._prepare_options_data(contents)
if append:
contents.update(options)
else:
contents = options
options_set = self._compose_options_set(contents)
if options_set:
options_set = self._wrap_options(options_set)
data = self._compose_output_data(
options_set, extensions, extensions_data)
# Convert 'no value' values of directives
for idx in range(0, len(data)):
line = data[idx]
line_parts = line.split('=')
if len(line_parts) != 2:
continue
if line_parts[1] == 'no value':
# put empty string instead 'no value' to directive value
data[idx] = line_parts[0] + '='
self._write_to_file(
user, '\n'.join(data).rstrip()+'\n', user_ini_path, create)
self._reload_processes(user)
self._backup_settings(user, version, options_set, create)
def delete_options(self, user, version,
optset, decoder, quiet=True):
"""
Deletes supplied options from current ones
"""
options = self._process_option_string(
optset=optset, decoder=decoder, expect_separator=False)
self._check_user_in_cagefs(user)
user_ini_path = self._compose_user_ini_path(user, version)
(contents, extensions,
extensions_data) = self._load_ini_contents(user_ini_path)
contents = self._prepare_options_data(contents)
for opt in options.keys():
contents.pop(opt, None)
options_set = self._compose_options_set(contents)
options_set = self._wrap_options(options_set)
data = self._compose_output_data(
options_set, extensions, extensions_data)
self._write_to_file(
user, '\n'.join(data).rstrip()+'\n', user_ini_path)
self._reload_processes(user)
self._backup_settings(user, version, options_set)
def get_options(self, user, version=None):
"""
Returns options summary for a user
@param user: string
@param version: string
return: dict
"""
if not version:
version = self.get_version(user)[0]
if version == 'native':
raise ClSelectExcept.UnableToGetExtensions(version)
self._get_ini_defaults(version)
self._get_user_ini(user, version)
return self._get_whitelist(version)
def reset_options(self, users=None, versions=None):
"""
Deletes all custom options settings
@param users: list
@param versions: list
"""
all_users = self.list_all_users()
alternatives = self.get_all_alternatives_data()
for version in alternatives.keys():
if versions and version not in versions:
continue
for user in all_users:
if users and user not in users:
continue
try:
self.insert_options(user=user, version=version,
optset='', decoder='plain', append=False, quiet=True,
create=False)
except ClSelectExcept.NotCageFSUser:
continue
def _prepare_options_data(self, contents):
options = {}
for item in contents:
if item.strip() == "":
continue
if item.startswith(';>===') or item.startswith(';<==='):
continue
key, value = list(map((lambda x:x.strip()), item.split('=', 1)))
if value == '':
value = 'no value'
options.update({key: value})
return options
def _get_whitelist(self, version):
"""
Returns whitelist data
"""
if not self._whitelist:
self._load_whitelist(version)
return self._whitelist
def _load_whitelist(self, version):
"""
Parses php config file (not php.ini!) and updates structure
"""
# Get short_php_version_to_full map
alternatives = self.get_all_alternatives_data()
self._check_alternative(version, alternatives)
if '.' not in version:
raise ClSelectExcept.UnableToGetExtensions(version)
# Short to full PHP version map. Example: {'4.4', '4.4.9'}
php_versions = dict()
for short_ver, ver_data in iteritems(alternatives):
php_versions[short_ver] = ver_data['version']
try:
# Read config
conf_reader = PhpConfReader(self.OPTIONS_PATH)
php_conf_dict = conf_reader.get_config_for_selectorctl(version, php_versions)
self._whitelist.update(php_conf_dict)
except PhpConfNoSuchAlternativeException as e:
raise ClSelectExcept.UnableToGetExtensions(e.php_version)
except (PhpConfReadError, PhpConfLoadException, PhpConfBaseException) as e:
raise ClSelectExcept.UnableToLoadData(self.OPTIONS_PATH, str(e))
def _handle_option_item(option_item, expect_separator=True):
"""
Splits options data into key-value pair and returns it
@param option_item: string
@param expect_separator: bool
@return: dict
"""
if ':' in option_item:
option_name, option_value = option_item.split(':', 1)
else:
if not expect_separator:
option_name, option_value = option_item, ''
else:
raise ClSelectExcept.WrongData(
"Colon as a separator expected (%s)!" % (option_item,))
return {option_name: option_value}
_handle_option_item = staticmethod(_handle_option_item)
def _decoder(data, decoder='plain'):
"""
Decodes option item
@param data: string
@param decoder: string
@return: string
"""
dispatcher = {
'plain': (lambda x: x),
'base64': (lambda x: base64.b64decode(x))}
try:
return dispatcher[decoder](data)
except KeyError:
return dispatcher['plain'](data)
_decoder = staticmethod(_decoder)
def _process_option_string(cls, optset, decoder='plain', expect_separator=True):
"""
Wrapper around options parsing routines
@param optset: string
@param decoder: callback name
@expect_separator: bool
@return: dict
"""
options = {}
if optset:
for option_item in optset.split(','):
option_item = cls._decoder(option_item, decoder)
options.update(
cls._handle_option_item(
option_item, expect_separator))
return options
_process_option_string = classmethod(_process_option_string)
def _remove_forbidden_options(self, options, version, quiet=True):
"""
Check if all options to process are present in white list
and removes forbidden ones or raise an exception
@param options: dict
@param quiet: bool
@return: dict
"""
whitelist = self._get_whitelist(version)
if not set(options.keys()).issubset(set(whitelist.keys())):
white_list_options = {}
for opt_name, opt_value in iteritems(options):
if opt_name not in whitelist:
if quiet:
continue
else:
raise ClSelectExcept.UnableToProcessOption(opt_name)
white_list_options[opt_name] = opt_value
options = white_list_options
return options
def _compose_options_set(options):
"""
Construct option item from key and value pair
@param options: dict
return: list
"""
options_set = []
for opt_name, opt_value in iteritems(options):
options_set.append("%s=%s" % (opt_name, opt_value))
return options_set
_compose_options_set = staticmethod(_compose_options_set)
def _wrap_options(self, contents):
"""
Adds identifying string before and after dataset
@param contents: list
"""
data = [';>=== Start of PHP Selector Custom Options ===']
data.extend(contents)
data.append(';<=== End of PHP Selector Custom Options =====')
return data
def _compose_output_data(contents, extensions, extensions_data):
"""
Construct output
@param contents: list
@param extensions: list
@param extensions_data: dict
return: list
"""
data = []
for item in extensions:
data.extend(extensions_data[item])
# Add two spacelines between each extension
data.extend(["", ""])
data.extend(contents)
return data
_compose_output_data = staticmethod(_compose_output_data)
def _check_version(self, test, version):
"""
Compares version in use and version required by PHP feature
and return true if PHP feature satisfies
"""
alternatives = self.get_all_alternatives_data()
self._check_alternative(version, alternatives)
if '.' not in version:
raise ClSelectExcept.UnableToGetExtensions(version)
v_array = list(map((lambda x: int(x)), alternatives[version]['version'].split('.')))
# if test has 2 section, add third
if len(test.split('.')) == 2:
test += '.0'
patt = re.compile(r'([<>=]{1,2})?(\d+\.\d+\.\d+)\.?')
m = patt.match(test)
if not m:
raise ClSelectExcept.NoSuchAlternativeVersion(test)
action = m.group(1)
test = list(map((lambda x: int(x)), m.group(2).split('.')))
version_int = v_array[0] << 11 | v_array[1] << 7 | v_array[2]
test_int = test[0] << 11 | test[1] << 7 | test[2]
if action == r'<' and version_int < test_int:
return True
if action == r'<=' and version_int <= test_int:
return True
if action == r'>' and version_int > test_int:
return True
if action == r'>=' and version_int >= test_int:
return True
if not action or action == r'=':
version_int = v_array[0] << 11 | v_array[1] << 7
test_int = test[0] << 11 | test[1] << 7
if version_int == test_int:
return True
return False
def _get_php_error_tbl(self, php_ver):
# http://php.net/manual/en/errorfunc.constants.php
php_error_table = {
1: 'E_ERROR',
2: 'E_WARNING',
4: 'E_PARSE',
8: 'E_NOTICE',
16: 'E_CORE_ERROR',
32: 'E_CORE_WARNING',
64: 'E_COMPILE_ERROR',
128: 'E_COMPILE_WARNING',
256: 'E_USER_ERROR',
512: 'E_USER_WARNING',
1024: 'E_USER_NOTICE',
2048: 'E_STRICT' # E_STRICT since PHP 5 but not included in E_ALL until PHP 5.4.0
}
if self._check_version('<5.2.0', php_ver):
php_error_table[2047] = 'E_ALL'
if self._check_version('>=5.2.0', php_ver):
php_error_table[4096] = 'E_RECOVERABLE_ERROR' # E_RECOVERABLE_ERROR since PHP 5.2.0
if self._check_version('<5.3.0', php_ver):
php_error_table[6143] = 'E_ALL' # E_ALL 6143 in PHP 5.2.x
if self._check_version('>=5.3.0', php_ver):
php_error_table[8192] = 'E_DEPRECATED' # E_DEPRECATED since PHP 5.3.0
php_error_table[16384] = 'E_USER_DEPRECATED' # E_USER_DEPRECATED since PHP 5.3.0
if self._check_version('<5.4.0', php_ver):
php_error_table[30719] = 'E_ALL' # E_ALL 30719 in PHP 5.3.x
if self._check_version('>=5.4.0', php_ver):
php_error_table[32767] = 'E_ALL' # E_ALL 32767 in PHP >= 5.4.x
return php_error_table
def _php_string2error(self, str_, php_ver):
"""
Convert php error level 'error-reporting' from string to code
http://php.net/manual/ru/function.error-reporting.php
#>>> ClUserOptSelect(item='php')._php_string2error('E_ALL & ~E_NOTICE', '5.4')
32759
#>>> ClUserOptSelect(item='php')._php_string2error('E_USER_ERROR | E_NOTICE', '5.4')
264
#>>> ClUserOptSelect(item='php')._php_string2error('E_ERROR | E_WARNING | E_PARSE | E_COMPILE_ERROR', '5.4')
71
#>>> ClUserOptSelect(item='php')._php_string2error('E_ERROR | INCORRECT', '5.4') # incorrect variable 'INCORRECT'
None
#>>> ClUserOptSelect(item='php')._php_string2error('E_ERROR + E_WARNING', '5.4') # incorrect operator '+'
None
:param str: error_reporting variable
:return None|int: error_reporting error code; return None if can't convert
"""
VALID_SYMBOLS = '0123456789|&~!^ ' # http://php.net/manual/en/errorfunc.constants.php
php_error_table = self._get_php_error_tbl(php_ver)
# replacing all constants to the numbers
for code, name in iteritems(php_error_table):
str_ = str_.replace(name, str(code))
# check if str_ has only valid symbols
if set(str_).difference(set(VALID_SYMBOLS)):
return None
try:
error_code = int(eval(str_))
except (SyntaxError, ValueError, TypeError):
return None
return error_code
def _get_error_desc(self, value, version, range_):
if not re.match(r'^-?\d{1,5}$', value): # error-reporting code must be from 32767 to -32767
return ''
desc = []
value = int(value)
for error_string in range_:
if self._php_string2error(error_string, php_ver=version) == value:
return error_string
php_error_table = self._get_php_error_tbl(php_ver=version)
for error in php_error_table:
if (error & value) == error:
desc.append(php_error_table[error])
return r' | '.join(desc)
def _get_ini_defaults(self, version):
"""
Gets PHP defaults (calls php -i)
@param version: string
"""
alternatives = self.get_all_alternatives_data()
self._check_alternative(version, alternatives)
whitelist = self._get_whitelist(version)
if not os.path.isfile(alternatives[version]['data'][self._item]):
raise ClSelectExcept.NoSuchAlternativeVersion(version)
env_data = os.environ
if ('SCRIPT_FILENAME' in env_data):
script_path = '/usr/share/l.v.e-manager/utils/clinfo.php'
if os.path.exists(script_path):
env_data['SCRIPT_FILENAME'] = script_path
cmd = [alternatives[version]['data'][self._item]]
else:
cmd = [alternatives[version]['data'][self._item], '-qi']
env_data.pop('SERVER_SOFTWARE', None)
env_data['PHP_FCGI_MAX_REQUESTS'] = '1'
env_data['PHP_FCGI_CHILDREN'] = '0'
env_data['ACCEPT_ENCODING'] = ''
env_data['HTTP_ACCEPT_ENCODING'] = ''
tag_pattern = re.compile(
r'<tr[^>]*?><td[^>]*>(.*?)</td><td[^>]*>(.*?)</td>(?:<td[^>]*>(.*?)</td>)?</tr>')
strip_pattern = re.compile(r'<[^>]*?>')
cmd[1:1] = ['-d', 'opcache.enable_cli=0',
'-d', 'zlib.output_compression=Off',
'-d', 'auto_append_file=none',
'-d', 'auto_prepend_file=none']
output = utils.run_command(cmd, env_data)
lines = tag_pattern.findall(output)
for l in lines:
directive = re.sub(strip_pattern, '', l[0])
if directive in whitelist:
# convert html entries to string
s = re.sub(strip_pattern, '', (l[2] or l[1]))
value = unescape(s, self._html_unescape_table)
if value == 'no value':
if ('default' in whitelist[directive] and
whitelist[directive]['default'] != ""):
continue
else:
whitelist[directive]['default'] = ""
else:
if directive == 'error_reporting':
error_range = whitelist[directive]['range'].split(',')
value = self._get_error_desc(value, version, error_range)
whitelist[directive]['default'] = value
self._whitelist.update(whitelist)
def _get_user_ini(self, user, version):
"""
Parses user ini file and updates
values of existing data
@param user: string
"""
self._get_whitelist(version)
user_ini_path = self._compose_user_ini_path(user, version)
(contents, extensions,
extensions_data) = self._load_ini_contents(user_ini_path)
contents = self._prepare_options_data(contents)
for key in contents:
try:
self._whitelist[key]['value'] = contents[key]
except KeyError:
continue
def _backup_settings(self, user, version, data, create=True):
"""
On saving user settings keep backup on user homedir
@param user: string
@param version: string
@param data: list
"""
user_backup_path = os.path.join(
self._clpwd.get_homedir(user), '.cl.selector')
if not os.path.isdir(user_backup_path):
try:
clcaptain.mkdir(user_backup_path)
except (OSError, ExternalProgramFailed) as e:
raise ClSelectExcept.UnableToSaveData(user_backup_path, e)
user_backup_file = os.path.join(
user_backup_path, "alt_php%s.cfg" % version.replace('.', ''))
# replace 'no value' in directive value to empty
for idx in range(0, len(data)):
line = data[idx]
line_parts = line.split('=')
if len(line_parts) == 2 and line_parts[1] == 'no value':
data[idx] = line_parts[0] + '='
self._write_to_file(
user, '\n'.join(data), user_backup_file, create)
def backup_php_options(self, user):
"""
rewrite php backup file with php options
@param user: string
"""
self._check_user_in_cagefs(user)
alternatives = self.get_all_alternatives_data()
for version in alternatives.keys():
user_ini_path = self._compose_user_ini_path(user, version)
(contents, extensions,
extensions_data) = self._load_ini_contents(user_ini_path)
contents = self._prepare_options_data(contents)
options_set = self._compose_options_set(contents)
if options_set:
options_set = self._wrap_options(options_set)
self._backup_settings(user, version, options_set)